import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../../common/test_page.dart';
import '../../utils.dart';

Stream<int> _getStream() =>
    Stream<int>.periodic(const Duration(milliseconds: 20), (count) => count)
        .take(5);

Stream<int> _getSampleStream() =>
    Stream<int>.periodic(const Duration(milliseconds: 35), (count) => count)
        .take(10);

class SampleTestPage extends TestPage {
  SampleTestPage(super.title) {
    test('Rx.sample', () async {
      final stream = _getStream().sample(_getSampleStream());

      expect(stream);
    });

    test('Rx.sample.reusable', () async {
      final transformer = SampleStreamTransformer<int>(
              (_) => _getSampleStream().asBroadcastStream());
      final streamA = _getStream().transform(transformer);
      final streamB = _getStream().transform(transformer);

      expect(streamA);
      expect(streamB);
    });

    test('Rx.sample.onDone', () async {
      final stream = Stream.value(1).sample(Stream<void>.empty());

      expect(stream);
    });

    test('Rx.sample.shouldClose', () async {
      final controller = StreamController<int>();

      controller.stream
          .sample(Stream<void>.empty()) // should trigger onDone
          .listen(null, onDone: (() => expect(true)));

      controller.add(0);
      controller.add(1);
      controller.add(2);
      controller.add(3);

      scheduleMicrotask(controller.close);
      expect(controller);
    });

    test('Rx.sample.asBroadcastStream', () async {
      final stream = _getStream()
          .asBroadcastStream()
          .sample(_getSampleStream().asBroadcastStream());

      // listen twice on same stream
      stream.listen(null);
      stream.listen(null);
      // code should reach here
      expect(true);
    });

    test('Rx.sample.error.shouldThrowA', () async {
      final streamWithError =
      Stream<void>.error(Exception()).sample(_getSampleStream());

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.sample.error.shouldThrowB', () async {
      final streamWithError = Stream.value(1)
          .sample(Stream<void>.error(Exception('Catch me if you can!')));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
      expect('如果只看到我说明没有抓到错误');
    });

    test('Rx.sample.pause.resume', () async {
      final controller = StreamController<int>();
      late StreamSubscription<int> subscription;

      subscription = _getStream()
          .sample(_getSampleStream())
          .listen(controller.add, onDone: () {
        controller.close();
        subscription.cancel();
      });

      expect(controller.stream);

      subscription.pause();
      subscription.resume();
    });

    test('Rx.sample.nullable', () {
      nullableTest<String?>(
            (s) => s.sample(_getSampleStream()),
      );
    });

    test('Rx.sampleTime', () async {
      final stream = _getStream().sampleTime(const Duration(milliseconds: 35));

      expect(stream);
    });

    test('Rx.sampleTime.reusable', () async {
      final transformer = SampleStreamTransformer<int>((_) =>
          TimerStream<bool>(true, const Duration(milliseconds: 35))
              .asBroadcastStream());

      expect(
        _getStream().transform(transformer).drain<void>(),
      );
      expect(
        _getStream().transform(transformer).drain<void>(),
      );
    });

    test('Rx.sampleTime.onDone', () async {
      final stream = Stream.value(1).sampleTime(const Duration(seconds: 1));

      expect(stream);
    });

    test('Rx.sampleTime.shouldClose', () async {
      final controller = StreamController<int>();

      controller.stream
          .sampleTime(const Duration(seconds: 1)) // should trigger onDone
          .listen(null, onDone: (() => expect(true)));

      controller.add(0);
      controller.add(1);
      controller.add(2);
      controller.add(3);

      scheduleMicrotask(controller.close);
    });

    test('Rx.sampleTime.asBroadcastStream', () async {
      final stream = _getStream()
          .sampleTime(const Duration(milliseconds: 35))
          .asBroadcastStream();

      // listen twice on same stream
      stream.listen(null);
      stream.listen(null);
      // code should reach here
      expect(true);
    });

    test('Rx.sampleTime.error.shouldThrowA', () async {
      final streamWithError = Stream<void>.error(Exception())
          .sampleTime(const Duration(milliseconds: 35));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.sampleTime.pause.resume', () async {
      final controller = StreamController<int>();
      late StreamSubscription<int> subscription;

      subscription = _getStream()
          .sampleTime(const Duration(milliseconds: 35))
          .listen(controller.add, onDone: () {
        controller.close();
        subscription.cancel();
      });

      subscription.pause(Future<void>.delayed(const Duration(milliseconds: 50)));

      expect(
          controller.stream);
    });

    test('Rx.sampleTime.nullable', () {
      nullableTest<String?>(
            (s) => s.sampleTime(Duration.zero),
      );
    });

  }

}